{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Text Vectorization Pipeline\n",
    "\n",
    "This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.\n",
    "It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n",
    "\n",
    "The primary differences are that\n",
    "\n",
    "* We fit the entire model, including text vectorization, as a pipeline.\n",
    "* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n",
    "  rather than generators to work with larger than memory datasets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, progress\n",
    "\n",
    "client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Fetch the data\n",
    "\n",
    "Scikit-Learn provides a utility to fetch the newsgroups dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sklearn.datasets\n",
    "\n",
    "bunch = sklearn.datasets.fetch_20newsgroups()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The data from scikit-learn isn't *too* large, so the data is just\n",
    "returned in memory. Each document is a string. The target we're predicting\n",
    "is an integer, which codes the topic of the post.\n",
    "\n",
    "We'll load the documents and targets directly into a dask DataFrame.\n",
    "In practice, on a larger than memory dataset, you would likely load the\n",
    "documents from disk or cloud storage using `dask.bag` or `dask.delayed`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.dataframe as dd\n",
    "import pandas as pd\n",
    "\n",
    "df = dd.from_pandas(pd.DataFrame({\"text\": bunch.data, \"target\": bunch.target}),\n",
    "                    npartitions=25)\n",
    "\n",
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Each row in the `text` column has a bit of metadata and the full text of a post."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(df.head().loc[0, 'text'][:500])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Feature Hashing"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.\n",
    "\n",
    "Transformation, once we actually compute the result, happens in parallel and returns a dask Array."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask_ml.feature_extraction.text\n",
    "\n",
    "vect = dask_ml.feature_extraction.text.HashingVectorizer()\n",
    "X = vect.fit_transform(df['text'])\n",
    "X"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.\n",
    "\n",
    "Each block in `X` is a `scipy.sparse` matrix."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X.blocks[0].compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is a document-term matrix. Each row is the hashed representation of the original post."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Classification Pipeline\n",
    "\n",
    "We can combine the [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) with [Incremental](https://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) and a classifier like scikit-learn's `SGDClassifier` to\n",
    "create a classification pipeline.\n",
    "\n",
    "We'll predict whether the topic was in the `comp` category."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "bunch.target_names"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "\n",
    "positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]\n",
    "y = df['target'].isin(positive).astype(int)\n",
    "y"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import sklearn.linear_model\n",
    "import sklearn.pipeline\n",
    "\n",
    "import dask_ml.wrappers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Because the input comes from a dask Series, with unknown chunk sizes, we need to specify `assume_equal_chunks=True`. This tells Dask-ML that we know that each partition in `X`\n",
    "matches a partition in `y`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sgd = sklearn.linear_model.SGDClassifier(\n",
    "    tol=1e-3\n",
    ")\n",
    "clf = dask_ml.wrappers.Incremental(\n",
    "    sgd, scoring='accuracy', assume_equal_chunks=True\n",
    ")\n",
    "pipe = sklearn.pipeline.make_pipeline(vect, clf)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`SGDClassifier.partial_fit` needs to know the full set of classes up front.\n",
    "Becuase our `sgd` is wrapped inside an `Incremental`, we need to pass it through\n",
    "as the `incremental__classes` keyword argument in `fit`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipe.fit(df['text'], y,\n",
    "         incremental__classes=[0, 1]);"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As usual, `Incremental.predict` lazily returns the predictions as a dask Array."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "predictions = pipe.predict(df['text'])\n",
    "predictions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can compute the predictions and score in parallel with `dask_ml.metrics.accuracy_score`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dask_ml.metrics.accuracy_score(y, predictions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This simple combination of a HashingVectorizer and SGDClassifier is\n",
    "pretty effective at this prediction task."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
